package com.my.service.task.map;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.my.service.task.entity.BrandLike;
import com.my.service.task.kafka.KafkaEvent;
import com.my.service.task.log.ScanProductLog;
import com.my.service.task.util.HbaseUtils;
import com.my.service.task.utils.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;

import java.util.HashMap;
import java.util.Map;

public class BrandLikeMap implements FlatMapFunction<KafkaEvent, BrandLike> {
    @Override
    public void flatMap(KafkaEvent kafkaEvent, Collector<BrandLike> collector) throws Exception {
        // data 是停留在某个商品上发来的日志内容 ScanProductLog
        String data = kafkaEvent.getWord();
        // 将停留在某个商品上发来的日志信息解析为一个object
        ScanProductLog scanProductLog = JSONObject.parseObject(data, ScanProductLog.class);
        int userId = scanProductLog.getUserId();
        String brand = scanProductLog.getBrand();
        String tablename = "userflaginfo";
        String rowkey = userId + "";
        String familyname = "userbehavior";
        String column = "brandlist";
        // mapData是从Hbase中查询到的用户品牌偏好, 记录着用户每个品牌的购买次数
        String mapData = HbaseUtils.getdata(tablename, rowkey, familyname, column);
        Map<String,Long> map = new HashMap<>();
        // 如果Hbase中存在该用户的品牌偏好，那么先调出来
        if(StringUtils.isNotBlank(mapData)){
            map = JSONObject.parseObject(mapData, Map.class);
        }
        // 获取之前购买次数最多的brand
        String preMostLikeBrand = MapUtils.getKeyOfMaxValue(map);
        // 获取之前brand的购买次数
        long preBrandTime = map.get(brand) == null ? 0 : map.get(brand);
        // 更新当前brand的购买次数
        map.put(brand, preBrandTime + 1);
        String res = JSONObject.toJSONString(map);
        HbaseUtils.putdata(tablename,rowkey,familyname,column,res);
        // 获取现在购买次数最多的brand
        String nowMostLikeBrand = MapUtils.getKeyOfMaxValue(map);

        // 如果现在购买次数最多的brand相比之前发生了变化
        if(StringUtils.isNotBlank(preMostLikeBrand) && !preMostLikeBrand.equals(nowMostLikeBrand)){
            BrandLike brandLike = new BrandLike();
            brandLike.setBrand(preMostLikeBrand);
            // preMostLikeBrand 减上1
            brandLike.setCount(-1L);
            brandLike.setGroupbyfield("==brandlike==" + preMostLikeBrand);
            collector.collect(brandLike);
        }
        BrandLike brandLike = new BrandLike();
        brandLike.setBrand(nowMostLikeBrand);
        // nowMostLikeBrand 加上1
        brandLike.setCount(1L);
        brandLike.setGroupbyfield("==brandlike==" + nowMostLikeBrand);

        // 更新hbase中brandlike这一列，即用户当前的偏好
        column = "brandlike";
        assert nowMostLikeBrand != null;
        HbaseUtils.putdata(tablename, rowkey, familyname, column, nowMostLikeBrand);
    }
}
